package com.example.leader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderElection implements Watcher {


    private Logger logger = LoggerFactory.getLogger(LeaderElection.class);

    private String ELECTION_NAMESPACE;
    private static final int SESSION_TIMEOUT = 3000;
    private final String zkAddress;
    private ZooKeeper zooKeeper;
    private String currentZnodeName;

    public LeaderElection(String zkAddress, String instance) {
        this.zkAddress = zkAddress;
        if (instance == null) {
            instance = "/election_NONE";
            logger.warn("config instance is null ,use default value:{}", instance);
        }
        this.ELECTION_NAMESPACE = "/election_" + instance;
    }

    public void connect() throws IOException, InterruptedException {
        zooKeeper = new ZooKeeper(zkAddress, SESSION_TIMEOUT, this);
    }

    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    logger.info("Connected to ZooKeeper server");
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    logger.info("Disconnected from ZooKeeper server");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    logger.info("Session expired, trying to reconnect...");
                    try {
                        reconnect();
                    } catch (IOException | InterruptedException | KeeperException e) {
                        logger.error("Failed to reconnect: " + e.getMessage());
                    }
                }
                break;
            case NodeDeleted:
                if (event.getPath().equals(ELECTION_NAMESPACE + "/" + currentZnodeName)) {
                    logger.info("Node " + currentZnodeName + " deleted, trying to re-elect a leader...");
                    try {
                        runForLeader();
                    } catch (InterruptedException | KeeperException e) {
                        logger.error("Failed to re-elect a leader: " + e.getMessage());
                    }
                }
                break;
        }
    }

    public void reconnect() throws IOException, InterruptedException, KeeperException {
        close();
        connect();
        runForLeader();
    }

    public void runForLeader() throws InterruptedException, KeeperException {
        createElectionNodeIfNotExists();

        String znodePrefix = "n_";
        String znodeFullPath = zooKeeper.create(ELECTION_NAMESPACE + "/" + znodePrefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("Created znode " + znodeFullPath);

        currentZnodeName = znodeFullPath.replace(ELECTION_NAMESPACE + "/", "");

        while (true) {
            List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
            Collections.sort(children);

            String smallestChild = children.get(0);

            if (currentZnodeName.equals(smallestChild)) {
                logger.info("I am the leader");
                return;
            } else {
                int previousIndex = Collections.binarySearch(children, currentZnodeName) - 1;
                String previousZnodeName = children.get(previousIndex);

                Stat stat = zooKeeper.exists(ELECTION_NAMESPACE + "/" + previousZnodeName, this);
                if (stat == null) {
                    logger.info("Node " + previousZnodeName + " doesn't exist, trying to re-elect a leader...");
                    runForLeader();
                    return;
                }
            }

            Thread.sleep(1000);
        }
    }

    private void createElectionNodeIfNotExists() throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists(ELECTION_NAMESPACE, false);
        if (stat == null) {
            zooKeeper.create(ELECTION_NAMESPACE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("Created election node " + ELECTION_NAMESPACE);
        }
    }

}
